---
title: "Custom Middleware"
description: "AgentOS with custom middleware for rate limiting, logging, and monitoring"
---

This example demonstrates how to create and add custom middleware to your AgentOS application. We implement two common middleware types: rate limiting and request/response logging.

## Code

```python custom_middleware.py
import time
from collections import defaultdict, deque
from typing import Dict

from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIChat
from agno.os import AgentOS
from agno.tools.duckduckgo import DuckDuckGoTools
from fastapi import Request, Response
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware


# === Rate Limiting Middleware ===
class RateLimitMiddleware(BaseHTTPMiddleware):
    """
    Rate limiting middleware that limits requests per IP address.
    """

    def __init__(self, app, requests_per_minute: int = 60, window_size: int = 60):
        super().__init__(app)
        self.requests_per_minute = requests_per_minute
        self.window_size = window_size
        # Store request timestamps per IP
        self.request_history: Dict[str, deque] = defaultdict(lambda: deque())

    async def dispatch(self, request: Request, call_next) -> Response:
        # Get client IP
        client_ip = request.client.host if request.client else "unknown"
        current_time = time.time()

        # Clean old requests outside the window
        history = self.request_history[client_ip]
        while history and current_time - history[0] > self.window_size:
            history.popleft()

        # Check if rate limit exceeded
        if len(history) >= self.requests_per_minute:
            return JSONResponse(
                status_code=429,
                content={
                    "detail": f"Rate limit exceeded. Max {self.requests_per_minute} requests per minute."
                },
            )

        # Add current request to history
        history.append(current_time)

        # Add rate limit headers
        response = await call_next(request)
        response.headers["X-RateLimit-Limit"] = str(self.requests_per_minute)
        response.headers["X-RateLimit-Remaining"] = str(
            self.requests_per_minute - len(history)
        )
        response.headers["X-RateLimit-Reset"] = str(
            int(current_time + self.window_size)
        )

        return response


# === Request/Response Logging Middleware ===
class RequestLoggingMiddleware(BaseHTTPMiddleware):
    """
    Request/response logging middleware with timing and basic info.
    """

    def __init__(self, app, log_body: bool = False, log_headers: bool = False):
        super().__init__(app)
        self.log_body = log_body
        self.log_headers = log_headers
        self.request_count = 0

    async def dispatch(self, request: Request, call_next) -> Response:
        self.request_count += 1
        start_time = time.time()

        # Basic request info
        client_ip = request.client.host if request.client else "unknown"
        print(
            f"🔍 Request #{self.request_count}: {request.method} {request.url.path} from {client_ip}"
        )

        # Optional: Log headers
        if self.log_headers:
            print(f"📋 Headers: {dict(request.headers)}")

        # Optional: Log request body
        if self.log_body and request.method in ["POST", "PUT", "PATCH"]:
            body = await request.body()
            if body:
                print(f"📝 Body: {body.decode()}")

        # Process request
        response = await call_next(request)

        # Log response info
        duration = time.time() - start_time
        status_emoji = "✅" if response.status_code < 400 else "❌"
        print(
            f"{status_emoji} Response: {response.status_code} in {duration * 1000:.1f}ms"
        )

        # Add request count to response header
        response.headers["X-Request-Count"] = str(self.request_count)

        return response


# === Setup database and agent ===
db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai")

agent = Agent(
    id="demo-agent",
    name="Demo Agent",
    model=OpenAIChat(id="gpt-4o"),
    db=db,
    tools=[DuckDuckGoTools()],
    markdown=True,
)

agent_os = AgentOS(
    description="Essential middleware demo with rate limiting and logging",
    agents=[agent],
)

app = agent_os.get_app()

# Add custom middleware
app.add_middleware(
    RateLimitMiddleware,
    requests_per_minute=10,
    window_size=60,
)

app.add_middleware(
    RequestLoggingMiddleware,
    log_body=False,
    log_headers=False,
)

if __name__ == "__main__":
    """
    Run the essential middleware demo using AgentOS serve method.
    
    Features:
    1. Rate Limiting (10 requests/minute)
    2. Request/Response Logging
    """

    agent_os.serve(
        app="custom_middleware:app",
        reload=True,
    )
```

## Usage

<Steps>
  <Snippet file="create-venv-step.mdx" />

  <Step title="Set Environment Variables">
    ```bash
    export OPENAI_API_KEY=your_openai_api_key
    ```
  </Step>

  <Step title="Install libraries">
    ```bash
    pip install -U agno openai ddgs "fastapi[standard]" uvicorn sqlalchemy pgvector psycopg
    ```
  </Step>

  <Step title="Setup PostgreSQL Database">
    ```bash
    # Using Docker
    docker run -d \
      --name agno-postgres \
      -e POSTGRES_DB=ai \
      -e POSTGRES_USER=ai \
      -e POSTGRES_PASSWORD=ai \
      -p 5532:5432 \
      pgvector/pgvector:pg17
    ```
  </Step>

  <Step title="Run Example">
    ```bash
    python custom_middleware.py
    ```
  </Step>

  <Step title="Test the Middleware">
    
    **Basic Request (observe console logging):**
    ```bash
    curl http://localhost:7777/config
    ```
    
    **Test Rate Limiting (trigger 429 errors after 10 requests):**
    ```bash
    for i in {1..15}; do curl http://localhost:7777/config; done
    ```
    
    **Check Rate Limit Headers:**
    ```bash
    curl -v http://localhost:7777/config
    ```
    
  </Step>
</Steps>

## Middleware Features

<Tabs>
  <Tab title="Rate Limiting">
    **Prevents API abuse by limiting requests per IP:**
    
    - **Configurable Limits**: Set requests per minute and time window  
    - **Per-IP Tracking**: Different limits for different IP addresses
    - **Sliding Window**: Uses a sliding time window for accurate limiting
    - **Rate Limit Headers**: Provides client information about limits
    
    **Headers Added:**
    - `X-RateLimit-Limit`: Maximum requests allowed
    - `X-RateLimit-Remaining`: Requests remaining in current window  
    - `X-RateLimit-Reset`: Timestamp when the window resets
    
    **Customization:**
    ```python
    app.add_middleware(
        RateLimitMiddleware,
        requests_per_minute=100,  # Allow 100 requests per minute
        window_size=60,           # 60-second sliding window
    )
    ```
  </Tab>
  
  <Tab title="Request Logging">
    **Comprehensive request and response logging:**
    
    - **Request Details**: Method, path, client IP, timing
    - **Response Tracking**: Status codes, response time
    - **Optional Body Logging**: Log request bodies for debugging  
    - **Optional Header Logging**: Log request headers
    - **Request Counter**: Track total requests processed
    
    **Console Output Example:**
    ```
    🔍 Request #1: GET /config from 127.0.0.1
    ✅ Response: 200 in 45.2ms
    🔍 Request #2: POST /agents/demo-agent/runs from 127.0.0.1  
    ✅ Response: 200 in 1240.8ms
    ```
    
    **Customization:**
    ```python
    app.add_middleware(
        RequestLoggingMiddleware,
        log_body=True,     # Log request bodies
        log_headers=True,  # Log request headers
    )
    ```
  </Tab>
</Tabs>

## Developer Resources

- [Custom Middleware Documentation](/agent-os/customize/middleware/custom)
- [FastAPI Middleware Documentation](https://fastapi.tiangolo.com/tutorial/middleware/)
